{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "editable": true,
    "id": "HtgYO0bXEBrN",
    "slideshow": {
     "slide_type": ""
    },
    "tags": []
   },
   "source": [
    "# TPC-DS 10GiB - Apache Spark acceleration on GPU with RAPIDS Spark\n",
    "\n",
    "based on https://colab.research.google.com/github/LucaCanali/Miscellaneous/blob/master/Performance_Testing/TPCDS_PySpark/Labs_and_Notes/TPCDS_PySpark_getstarted.ipynb#scrollTo=6bab7772"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Install packages"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark_version='3.5.0'\n",
    "rapids_version='25.04.0'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "executionInfo": {
     "elapsed": 1630,
     "status": "ok",
     "timestamp": 1729291037060,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "Yq230e1Nho_M"
   },
   "outputs": [],
   "source": [
    "%pip install --quiet \\\n",
    "  tpcds_pyspark \\\n",
    "  pyspark=={spark_version} \\\n",
    "  sparkmeasure \\\n",
    "  pandas \\\n",
    "  matplotlib"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Import modules"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "executionInfo": {
     "elapsed": 1052,
     "status": "ok",
     "timestamp": 1729291488008,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "uq_LmKsB36R_"
   },
   "outputs": [],
   "source": [
    "from importlib.resources import files\n",
    "from pyspark.sql import SparkSession\n",
    "from tpcds_pyspark import TPCDS\n",
    "import glob\n",
    "import os\n",
    "import pandas as pd\n",
    "import re\n",
    "import time"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "edMCFrhvgDS8"
   },
   "source": [
    "# Download TPC-DS 10GiB Scale Parquet Dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "executionInfo": {
     "elapsed": 41530,
     "status": "ok",
     "timestamp": 1729292943990,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "DY8TkhPQTjbB"
   },
   "outputs": [],
   "source": [
    "if not os.path.isdir('tpcds_10'):\n",
    "  if not os.path.isfile('tpcds_10.zip'):\n",
    "    !wget https://sparkdltrigger.web.cern.ch/sparkdltrigger/TPCDS/tpcds_10.zip\n",
    "  !unzip -q tpcds_10.zip"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "6tgF9LWcgUEs"
   },
   "source": [
    "# Init a SparkSession with RAPIDS Spark"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Detect Scala Version used in PySpark package"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pyspark_files = files('pyspark')\n",
    "spark_sql_jar_path, *_ = glob.glob(f\"{pyspark_files}/*/spark-sql_*jar\")\n",
    "spark_sql_jar = os.path.basename(spark_sql_jar_path)\n",
    "scala_version = re.search(r'^spark-sql_(\\d+.\\d+)-.*\\.jar$', spark_sql_jar).group(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Find spark-measure artifact"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tpcds_pyspark_files = files('tpcds_pyspark')\n",
    "spark_measure_jar_paths = glob.glob(f\"{tpcds_pyspark_files}/spark-measure_{scala_version}-*.jar\")\n",
    "assert spark_measure_jar_paths, f\"No spark-measure artifact built for Pyspark's Scala version {scala_version}\"\n",
    "spark_measure_jar_paths.sort(reverse=True)\n",
    "spark_measure_jar_path, *_ = spark_measure_jar_paths"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "executionInfo": {
     "elapsed": 39420,
     "status": "ok",
     "timestamp": 1729289098419,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "-L-wMZTpfYxs"
   },
   "outputs": [],
   "source": [
    "spark = (\n",
    "    SparkSession.builder\n",
    "      .appName('TPCDS PySpark RAPIDS=ON/OFF')\n",
    "      .config('spark.driver.memory', '5g')\n",
    "      .config('spark.plugins', 'com.nvidia.spark.SQLPlugin')\n",
    "      .config('spark.jars', spark_measure_jar_path)\n",
    "      .config('spark.jars.packages', f\"com.nvidia:rapids-4-spark_{scala_version}:{rapids_version}\")\n",
    "      .getOrCreate()\n",
    ")\n",
    "spark\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "_4sYje2NiNA7"
   },
   "source": [
    "# Verify SQL Acceleration on GPU can be enabled by checking the query plan"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 0
    },
    "executionInfo": {
     "elapsed": 5921,
     "status": "ok",
     "timestamp": 1729289104337,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "nUyQBKtkga9y",
    "outputId": "5d493a51-58de-4aed-bbaf-d73c82769836"
   },
   "outputs": [],
   "source": [
    "spark.conf.set('spark.rapids.sql.enabled', True)\n",
    "sum_df = spark.range(1000).selectExpr('SUM(*)')\n",
    "sum_df.collect()\n",
    "sum_df.explain()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# TPCDS App"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 0
    },
    "executionInfo": {
     "elapsed": 4,
     "status": "ok",
     "timestamp": 1729289104337,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "BYPgafupcxaY",
    "outputId": "fdfb427f-6cc0-4dff-9295-dc44e6ead132"
   },
   "outputs": [],
   "source": [
    "# https://github.com/LucaCanali/Miscellaneous/tree/master/Performance_Testing/TPCDS_PySpark/tpcds_pyspark/Queries\n",
    "\n",
    "# queries = None to run all (takes much longer)\n",
    "queries = None\n",
    "queries = [\n",
    "    'q14a',\n",
    "    'q14b',\n",
    "    'q23a',\n",
    "    'q23b',\n",
    "    # 'q24a',\n",
    "    # 'q24b',\n",
    "    # 'q88',\n",
    "]\n",
    "\n",
    "demo_start = time.time()\n",
    "tpcds = TPCDS(data_path='./tpcds_10', num_runs=1, queries_repeat_times=1, queries=queries)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "0Yaaw2GfliC5"
   },
   "source": [
    "## Register TPC-DS tables before running queries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 0
    },
    "executionInfo": {
     "elapsed": 2992,
     "status": "ok",
     "timestamp": 1729289107327,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "kfsHodFqdDl7",
    "outputId": "5a810f9d-e353-456c-b7bb-48ae3290178a"
   },
   "outputs": [],
   "source": [
    "tpcds.map_tables()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "bs6X_54UhuqJ"
   },
   "source": [
    "## Measure Apache Spark GPU"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 0
    },
    "executionInfo": {
     "elapsed": 45658,
     "status": "ok",
     "timestamp": 1729290819190,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "8vXDasUom70g",
    "outputId": "adccdd7f-99f0-4c82-d600-056b59f53933"
   },
   "outputs": [],
   "source": [
    "tpcds.spark.conf.set('spark.rapids.sql.enabled', True)\n",
    "%time tpcds.run_TPCDS()\n",
    "gpu_grouped_results = tpcds.grouped_results_pdf.copy()\n",
    "gpu_grouped_results"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "ulyFidEPhg_l"
   },
   "source": [
    "## Measure Apache Spark CPU"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 0
    },
    "executionInfo": {
     "elapsed": 135425,
     "status": "ok",
     "timestamp": 1729289242749,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "Dg0itS7cdIf4",
    "outputId": "4ce1f8a2-5ac7-4805-e6f6-37a8acb7e039"
   },
   "outputs": [],
   "source": [
    "tpcds.spark.conf.set('spark.rapids.sql.enabled', False)\n",
    "%time tpcds.run_TPCDS()\n",
    "cpu_grouped_results = tpcds.grouped_results_pdf.copy()\n",
    "cpu_grouped_results"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "PcZ12b13h3cq"
   },
   "source": [
    "## Show Speedup Factors achieved by GPU\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "executionInfo": {
     "elapsed": 5,
     "status": "ok",
     "timestamp": 1729289293047,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "cJxS9Nqi3AQj"
   },
   "outputs": [],
   "source": [
    "res = pd.merge(cpu_grouped_results, gpu_grouped_results, on='query', how='inner', suffixes=['_cpu', '_gpu'])\n",
    "res['speedup'] = res['elapsedTime_cpu'] / res['elapsedTime_gpu']\n",
    "res = res.sort_values(by='elapsedTime_cpu', ascending=False)\n",
    "res"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "demo_dur = time.time() - demo_start\n",
    "print(f\"CPU and GPU run took: {demo_dur=} seconds\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 510
    },
    "executionInfo": {
     "elapsed": 1041,
     "status": "ok",
     "timestamp": 1729289294084,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "wn7u33fZlUJL",
    "outputId": "8d1ef757-e5c2-4761-fc58-f65f833bdffc"
   },
   "outputs": [],
   "source": [
    "res.plot(title='TPC-DS query elapsedTime on CPU vs GPU (lower is better)', kind='bar', x='query', y=['elapsedTime_cpu', 'elapsedTime_gpu'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 510
    },
    "executionInfo": {
     "elapsed": 381,
     "status": "ok",
     "timestamp": 1729289294462,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "hW-LZponwGQE",
    "outputId": "ed456120-ca7f-4c91-a2bf-de87c2401f0c"
   },
   "outputs": [],
   "source": [
    "res.plot(title='Speedup factors of TPC-DS queries on GPU', kind='bar', x='query', y='speedup' )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "Pk2TR4yimNqP"
   },
   "source": [
    "# Run Queries interactively"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "executionInfo": {
     "elapsed": 4,
     "status": "ok",
     "timestamp": 1729289294462,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "RpIl6NyNzqYU"
   },
   "outputs": [],
   "source": [
    "query = 'q88'\n",
    "with open(f\"{tpcds_pyspark_files}/Queries/{query}.sql\") as f:\n",
    "  q = f.read()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 0
    },
    "executionInfo": {
     "elapsed": 3,
     "status": "ok",
     "timestamp": 1729289294462,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "cuCVL1Ed1lQd",
    "outputId": "d256f4b7-e0e2-450c-ba88-aff0d7571510"
   },
   "outputs": [],
   "source": [
    "print(q)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "colab": {
     "base_uri": "https://localhost:8080/",
     "height": 0
    },
    "editable": true,
    "executionInfo": {
     "elapsed": 1470,
     "status": "ok",
     "timestamp": 1729289295930,
     "user": {
      "displayName": "Gera Shegalov",
      "userId": "07399839501144323282"
     },
     "user_tz": 420
    },
    "id": "n4QUdq17040i",
    "outputId": "7d7c7562-fae6-4426-97a7-ec23b8fe2f0d",
    "slideshow": {
     "slide_type": ""
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "spark.conf.set('spark.rapids.sql.enabled', True)\n",
    "df  = spark.sql(q)\n",
    "%time df.collect()"
   ]
  }
 ],
 "metadata": {
  "accelerator": "GPU",
  "colab": {
   "gpuType": "T4",
   "machine_shape": "hm",
   "provenance": []
  },
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
